Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send envelopes in batches #253

Merged
merged 4 commits into from
Oct 24, 2024
Merged

Send envelopes in batches #253

merged 4 commits into from
Oct 24, 2024

Conversation

richardhuaaa
Copy link
Contributor

@richardhuaaa richardhuaaa commented Oct 23, 2024

When streaming envelopes, send them in batches rather than one-by-one. We use a different batching strategy based on whether the subscription is originator/topic/global.

#255

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced batch processing for envelope sending, improving performance.
    • Enhanced listener and subscription mechanisms with better error handling and logging.
    • Added new methods to various envelope structs for improved topic management.
    • Introduced a new utility function for creating originator envelopes with specified topics.
  • Bug Fixes

    • Improved handling of invalid topics and ensured proper cursor management.
  • Tests

    • Updated test cases to utilize new topic variables for consistency and readability.

Copy link

coderabbitai bot commented Oct 23, 2024

Walkthrough

The changes in this pull request primarily focus on enhancing the handling and processing of envelopes within the messaging service. Key modifications include the refactoring of the sendEnvelopes method to send envelopes in batches rather than individually, improvements in the subscription worker's dispatching logic, and the introduction of new methods for managing topics within various envelope structs. Additionally, test files have been updated to ensure consistency in topic handling. Overall, the modifications aim to streamline the envelope processing and improve the maintainability of the code.

Changes

File Change Summary
pkg/api/message/service.go Refactored sendEnvelopes to send envelopes in a single response; adjusted cursor handling.
pkg/api/message/subscribeWorker.go Added logger to newListener, replaced topic encoding with validation; refactored dispatching into three methods.
pkg/api/message/subscribe_test.go Updated tests to use new topic variables instead of byte slices for better consistency.
pkg/api/query_test.go Similar updates to use new topic variables across test cases for consistency.
pkg/db/subscription_test.go Introduced a new topic variable topicA to replace hardcoded byte slices in test functions.
pkg/envelopes/originator.go Added TargetTopic method to OriginatorEnvelope.
pkg/envelopes/payer.go Added TargetTopic method to PayerEnvelope.
pkg/envelopes/unsignedOriginator.go Added TargetTopic method to UnsignedOriginatorEnvelope.
pkg/testutils/envelopes/envelopes.go Added CreateOriginatorEnvelopeWithTopic function for creating envelopes with specific topics.

Possibly related PRs


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor Author

This stack of pull requests is managed by Graphite. Learn more about stacking.

Join @richardhuaaa and the rest of your teammates on Graphite Graphite

@richardhuaaa richardhuaaa marked this pull request as ready for review October 23, 2024 20:26
@richardhuaaa richardhuaaa requested a review from a team as a code owner October 23, 2024 20:26
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (6)
pkg/envelopes/unsignedOriginator.go (1)

60-62: Consider adding method documentation.

Adding documentation would help clarify the purpose and return value of the TargetTopic method.

+// TargetTopic returns the topic associated with this envelope by delegating to the underlying PayerEnvelope
 func (u *UnsignedOriginatorEnvelope) TargetTopic() topic.Topic {
     return u.PayerEnvelope.TargetTopic()
 }
pkg/testutils/envelopes/envelopes.go (1)

123-138: LGTM! Consider adding LastSeen parameter for flexibility.

The implementation is clean and follows the existing patterns. However, you might want to consider adding an optional LastSeen parameter to support different test scenarios.

Consider this enhancement for future flexibility:

 func CreateOriginatorEnvelopeWithTopic(
 	t *testing.T,
 	originatorNodeID uint32,
 	originatorSequenceID uint64,
 	topic []byte,
+	lastSeen *envelopes.VectorClock,
 ) *envelopes.OriginatorEnvelope {
 	payerEnv := CreatePayerEnvelope(t, CreateClientEnvelope(
 		&envelopes.AuthenticatedData{
 			TargetTopic:      topic,
 			TargetOriginator: originatorNodeID,
-			LastSeen:         nil,
+			LastSeen:         lastSeen,
 		},
 	))
pkg/api/message/subscribe_test.go (2)

21-25: Consider keeping topics as Topic type for better debugging.

While converting topics to bytes works, keeping them as Topic type until needed would preserve their semantic meaning during test execution and debugging. This would make it easier to inspect topic properties in test failures.

 var (
-	topicA = topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, []byte("topicA")).Bytes()
-	topicB = topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, []byte("topicB")).Bytes()
-	topicC = topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, []byte("topicC")).Bytes()
+	topicA = topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, []byte("topicA"))
+	topicB = topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, []byte("topicB"))
+	topicC = topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, []byte("topicC"))
 )

Line range hint 1-286: Consider adding tests for batch size limits.

While the current test coverage is good, consider adding test cases that verify:

  • Maximum batch size handling
  • Performance with large numbers of topics
  • Behavior when approaching system resource limits

This would ensure the batching implementation remains robust under stress.

Would you like me to help create these additional test cases?

pkg/api/message/service.go (1)

183-198: Consider adding a maximum batch size limit.

While batching improves performance by reducing the number of stream.Send calls, consider adding a maximum batch size limit to prevent potential memory issues with very large sets of envelopes. This is particularly important in streaming scenarios where the number of envelopes could be substantial.

Consider:

  1. Adding a constant for max batch size
  2. Splitting into multiple sends when the batch size exceeds the limit
 const (
        maxRequestedRows     int32         = 1000
        maxQueriesPerRequest int           = 10000
        maxTopicLength       int           = 128
        maxVectorClockLength int           = 100
        pagingInterval       time.Duration = 100 * time.Millisecond
+       maxBatchSize        int           = 1000
 )

 func (s *Service) sendEnvelopes(
        stream message_api.ReplicationApi_SubscribeEnvelopesServer,
        query *message_api.EnvelopesQuery,
        envs []*envelopes.OriginatorEnvelope,
 ) error {
        cursor := query.GetLastSeen().GetNodeIdToSequenceId()
        if cursor == nil {
                cursor = make(map[uint32]uint64)
                query.LastSeen = &envelopesProto.VectorClock{
                        NodeIdToSequenceId: cursor,
                }
        }

        envsToSend := make([]*envelopesProto.OriginatorEnvelope, 0, len(envs))
        for _, env := range envs {
                if cursor[uint32(env.OriginatorNodeID())] >= env.OriginatorSequenceID() {
                        continue
                }

                envsToSend = append(envsToSend, env.Proto())
                cursor[uint32(env.OriginatorNodeID())] = env.OriginatorSequenceID()

+               // Send batch if it reaches the maximum size
+               if len(envsToSend) >= maxBatchSize {
+                       if err := stream.Send(&message_api.SubscribeEnvelopesResponse{
+                               Envelopes: envsToSend,
+                       }); err != nil {
+                               return status.Errorf(codes.Internal, "error sending envelopes: %v", err)
+                       }
+                       envsToSend = make([]*envelopesProto.OriginatorEnvelope, 0, maxBatchSize)
+               }
        }

+       // Send remaining envelopes
+       if len(envsToSend) > 0 {
                err := stream.Send(&message_api.SubscribeEnvelopesResponse{
                        Envelopes: envsToSend,
                }); err != nil {
                        return status.Errorf(codes.Internal, "error sending envelopes: %v", err)
                }
+       }
        return nil
 }
pkg/api/message/subscribeWorker.go (1)

Line range hint 279-283: Re-evaluate removing listeners when channels are full

Immediately removing listeners when their channels are full might lead to unintended disconnections under high load. Consider implementing backpressure handling, such as buffering messages, dropping the oldest messages, or notifying the listener before removal.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 88e7845 and fe6829f.

📒 Files selected for processing (9)
  • pkg/api/message/service.go (1 hunks)
  • pkg/api/message/subscribeWorker.go (8 hunks)
  • pkg/api/message/subscribe_test.go (7 hunks)
  • pkg/api/query_test.go (6 hunks)
  • pkg/db/subscription_test.go (3 hunks)
  • pkg/envelopes/originator.go (2 hunks)
  • pkg/envelopes/payer.go (2 hunks)
  • pkg/envelopes/unsignedOriginator.go (2 hunks)
  • pkg/testutils/envelopes/envelopes.go (1 hunks)
🧰 Additional context used
🔇 Additional comments (23)
pkg/envelopes/payer.go (2)

9-9: LGTM!

The topic package import is appropriately placed and necessary for the new functionality.


60-62: LGTM!

The TargetTopic method is well-implemented as a clean delegation to the underlying ClientEnvelope, maintaining consistency with similar implementations across other envelope types. This addition supports the PR's objective of enabling batch processing by providing topic information for envelope routing.

pkg/envelopes/originator.go (2)

7-7: LGTM: Import addition is appropriate.

The addition of the topic package import is necessary for the new TargetTopic method's return type.


61-63: LGTM: Clean implementation of TargetTopic method.

The method follows the established pattern of delegating to the UnsignedOriginatorEnvelope, maintaining consistency with other envelope types. This addition supports the PR's objective of batch processing by providing topic information for envelope grouping.

Let's verify the consistency of TargetTopic implementation across envelope types:

✅ Verification successful

Let me try a different approach to verify the TargetTopic implementations.

This should help us see both the interface definitions and actual implementations of TargetTopic across different envelope types. The -A 3 flag will show some context after each match to help us analyze the implementation patterns.


Verified: TargetTopic implementation follows consistent delegation pattern

The implementation in OriginatorEnvelope correctly follows the established delegation chain pattern seen across all envelope types:

  • OriginatorEnvelope delegates to UnsignedOriginatorEnvelope
  • UnsignedOriginatorEnvelope delegates to PayerEnvelope
  • PayerEnvelope delegates to ClientEnvelope
  • ClientEnvelope provides the actual implementation by returning targetTopic

This forms a clean delegation chain that maintains consistency across the envelope hierarchy, making the code predictable and maintainable.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify TargetTopic method implementation across envelope types
# Expected: All envelope types should have consistent TargetTopic implementations

# Search for TargetTopic method implementations
ast-grep --pattern 'func $_($_) TargetTopic() topic.Topic {
  $$$
}'

Length of output: 70


Script:

#!/bin/bash
# Search for TargetTopic method implementations using ripgrep
# This will help us find all occurrences across envelope types
rg "TargetTopic\(\)" -A 3 pkg/envelopes/

# Also search for any interfaces defining TargetTopic
rg "interface.*\{.*TargetTopic.*\}" pkg/envelopes/

Length of output: 1046

pkg/envelopes/unsignedOriginator.go (2)

7-7: LGTM!

The topic package import is correctly added and necessary for the new TargetTopic method.


60-62: LGTM! Implementation aligns with PR objectives.

The TargetTopic method correctly delegates to PayerEnvelope, supporting the envelope batching functionality by providing consistent topic access across different envelope types.

pkg/testutils/envelopes/envelopes.go (1)

123-138: Verify usage in batch-related tests.

Let's verify that this new function is being used appropriately in the batch-related test files.

✅ Verification successful

Let me analyze the results and generate the final response.

Function is properly integrated in test files

The CreateOriginatorEnvelopeWithTopic function is being used appropriately in two test files:

  • pkg/api/query_test.go
  • pkg/api/message/subscribe_test.go

In both files, the function is used consistently to create test envelopes with different combinations of:

  • Originator node IDs (1 and 2)
  • Sequence IDs (1, 2, and 3)
  • Topics (topicA and topicB)

The usage pattern shows proper integration for testing query and subscription scenarios with varying parameters, which is exactly what this utility function was designed for.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find test files using this new function to verify proper integration
# with batch-related tests.

echo "Searching for test files using CreateOriginatorEnvelopeWithTopic..."
rg -l "CreateOriginatorEnvelopeWithTopic" --type go --type-add 'go:*.go' | grep "_test.go"

echo -e "\nChecking the context of usage..."
rg -B 2 -A 2 "CreateOriginatorEnvelopeWithTopic" --type go --type-add 'go:*.go'

Length of output: 3489

pkg/db/subscription_test.go (3)

14-14: LGTM: Required import for topic handling.

The added import is necessary for the new topic creation functionality.


34-34: LGTM: Consistent topic usage across test data.

The test data consistently uses the properly constructed topic variable, improving maintainability and type safety across all test cases.

Note: This change aligns well with the PR's objective of batch processing envelopes, as it ensures proper topic structuring which is crucial for correct envelope routing.

Also applies to: 40-40, 68-68, 74-74, 80-80


18-19: LGTM: Improved topic handling with centralized definition.

Good refactoring to use a properly constructed topic instead of raw byte slices. This improves type safety and maintainability.

Let's verify the topic kind usage is consistent across the codebase:

✅ Verification successful

Based on the search results, I can now generate the final response as we have comprehensive information about the usage of TOPIC_KIND_GROUP_MESSAGES_V1 across the codebase.

Topic kind usage is consistent and appropriate

The verification shows that TOPIC_KIND_GROUP_MESSAGES_V1 is used consistently throughout the codebase for group message-related functionality:

  • It's properly defined in pkg/topic/topic.go
  • Used consistently in test utilities and test files
  • Correctly handled in core functionality like pkg/indexer/storer/groupMessage.go and pkg/api/payer/service.go
  • The usage in subscription_test.go aligns with the established patterns across the codebase
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if TOPIC_KIND_GROUP_MESSAGES_V1 is used consistently in tests
rg -A 2 "TOPIC_KIND_GROUP_MESSAGES_V1" --type go

Length of output: 4253

pkg/api/query_test.go (7)

16-22: LGTM: Well-structured topic handling refactor.

The introduction of centralized topic variables using topic.NewTopic improves code maintainability and consistency. The consistent use of TOPIC_KIND_GROUP_MESSAGES_V1 across all topics is appropriate.


30-69: LGTM: Improved test data setup with explicit topic handling.

The refactoring from direct byte slices to using CreateOriginatorEnvelopeWithTopic makes the test setup more explicit and maintainable. The consistent use of topic variables across test cases strengthens the test suite's reliability.


137-137: LGTM: Consistent topic handling in query test.

The update to use the new topic variable maintains consistency with the refactored approach.


174-174: LGTM: Proper topic handling in vector clock test.

The update maintains consistency while properly testing the interaction between topic filtering and vector clock-based queries.


195-195: LGTM: Well-structured test for multi-topic batching scenario.

The test case effectively validates querying multiple topics with vector clock, which is particularly relevant for the PR's envelope batching objective.


237-237: LGTM: Appropriate use of unused topic for empty result test.

Using topicC effectively tests the empty result case while maintaining consistency with the new topic handling approach.


255-256: LGTM: Proper validation of invalid query conditions.

The test effectively validates error handling when combining topic and originator filters while maintaining consistency with the new topic handling approach.

pkg/api/message/subscribe_test.go (2)

34-74: LGTM! Test setup properly creates diverse test scenarios.

The setup creates a good mix of test data with different originators and topics, which is essential for testing the new batching functionality. The consistent use of topic variables and CreateOriginatorEnvelopeWithTopic makes the test setup clear and maintainable.


149-149: LGTM! Comprehensive test coverage for subscription scenarios.

The test cases effectively cover:

  • Topic-based filtering
  • Multiple simultaneous subscriptions
  • Cursor-based pagination
  • Invalid request handling

This ensures the robustness of the envelope batching implementation.

Let's verify if we have test coverage for all edge cases:

Also applies to: 201-201, 236-236, 258-258, 277-277

pkg/api/message/service.go (1)

183-191: LGTM! Efficient envelope batching implementation.

The implementation efficiently accumulates envelopes with proper cursor handling and memory pre-allocation.

pkg/api/message/subscribeWorker.go (3)

Line range hint 34-60: Good use of topic validation and logging in newListener

The addition of topic.ParseTopic for topic validation and incorporation of logging enhances the robustness and debuggability of the listener initialization process.


131-135: Ensure type assertions in rangeKeys are safe

In the rangeKeys method, type assertions are used without checks. Verify that these assertions (key.(K) and value.(*listenerSet)) are safe and cannot cause panics due to unexpected types.

Run the following script to check for potential issues with type assertions:

#!/bin/bash
# Description: Ensure that all type assertions in listenersMap are safe.

# Test: Search for type assertions in listenersMap methods.
rg 'listenersMap\[.*\]' -A 10 | rg '\.\(.*\)'

310-312: Ensure all callers of newListener are updated with the new logger parameter

With the addition of the logger parameter to newListener, verify that all calls to this function pass the appropriate logger to prevent potential nil pointer dereferences or missing logs.

Run the following script to find all calls to newListener and check for the logger parameter:

✅ Verification successful

Let me gather more context about the newListener function definition and its usage by generating additional shell scripts.


Based on the gathered information, I can now generate the final response:

The newListener function call is correctly updated with the logger parameter

The verification shows that:

  1. The newListener function signature includes the logger parameter of type *zap.Logger
  2. The only call to newListener in the codebase is in subscribeWorker.listen() method, which correctly passes s.log as the logger parameter
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find all calls to `newListener` and ensure they include the `logger` parameter.

# Test: Search for `newListener` function calls. Expect: All calls pass `logger` as an argument.
rg 'newListener\(' -A 1

Length of output: 656


Script:

#!/bin/bash
# Get the full definition of newListener function to understand its parameters
ast-grep --pattern 'func newListener($_) {
  $$$
}'

# Get more context around the usage of newListener
rg 'newListener\(' -B 5 -A 5

Length of output: 1483

pkg/api/message/subscribeWorker.go Show resolved Hide resolved
pkg/api/message/subscribeWorker.go Show resolved Hide resolved
@richardhuaaa richardhuaaa enabled auto-merge (squash) October 24, 2024 00:36
@richardhuaaa richardhuaaa merged commit 51cbe4f into main Oct 24, 2024
8 checks passed
@richardhuaaa richardhuaaa deleted the 10-21-rich_batch-stream branch October 24, 2024 21:20
@coderabbitai coderabbitai bot mentioned this pull request Oct 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants